package cn.doitedu.ml.demo

import cn.doitedu.commons.util.SparkUtil
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders}

import scala.collection.Map
// 将数据封装成自定义的case class Person
case class Person(id:Int,name:String,age:Int)

object SparkSQLApi {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val spark = SparkUtil.getSparkSession("")

    // 1. 创建Dataset
    // Dataset的本质是 :RDD+schema
    // RDD[Type] ： RDD[Row] RDD[String]  RDD[Int] RDD[Array[String]] RDD[()] RDD[case class]

    /**
      * 1,zs,28
      * 2,ls,39
      */
    val ds: Dataset[String] = spark.read.textFile("userprofile/data/demo/sparksqldemo/demo1.txt")
    ds.printSchema()
    /**
      * root
      * |-- value: string (nullable = true)
      */
    ds.show()
    /**
      * +-------+
      * |  value|
      * +-------+
      * |1,zs,28|
      * |2,ls,39|
      * +-------+
      */
    // 思考1： 上面的ds，能不能把字段名value 改成 str ？
    val ds2: DataFrame = ds.withColumnRenamed("value","str")
    ds2.printSchema()
    /**
      * root
      * |-- str: string (nullable = true)
      */

    //
    //
    // String，可能是个别的，RDD[别的]
    //
    //
    /**
      *
      * 思考2：能不能把上面的ds中的字符串切割成数组？
      * 这里要 import spark.implicits._ ，为什么？
      * 因为，ds[String].map()操作，map中的运算所得到的结果不一定还是一个
      * 而ds.map方法最后返回的还是一个DataSet[别的]
      * 而RDD[别的] 变成  DataSet[别的]，是需要Encoder的
      *
      * Encoder的作用是什么：  Encoder就是将用户返回的数据类型（比如String，比如Caseclass，比如Map）解构出字段名、字段类型，并能对数据进行序列化
      */

    import spark.implicits._
    val ds3: Dataset[Array[String]] = ds.map(s=>{
      val arr = s.split(",")
      arr    // 将数据封装一个数组对象来返回
    })

    ds3.printSchema()
    ds3.show()
    /**
      * root
      * |-- value: array (nullable = true)
      * |    |-- element: string (containsNull = true)
      *
      * 说明SQLImplicits中的Array数据类型的Encoder，只能将array解构成一个字段
      *
      */

    /**
      *
      *
      * 手动定义Encoder，并为map方法显式传入Encoder示例：
      */
    import scala.reflect.runtime.universe.TypeTag
    implicit def myMapEncoder[T <: Map[_, _] : TypeTag]: Encoder[T] = ExpressionEncoder()
    val ds4: Dataset[Map[String, (String, String)]] = ds.map(s=>{
      val arr = s.split(",")
      Map(arr(0)->(arr(1),arr(2))) // 将数据封装一个Map对象来返回
    })(myMapEncoder)  // newMapEncoder是SQLImplicits对象中的一个隐式方法，此处可以让编译器自动填，也可以手动填
    ds4.printSchema()   // ds4这张“表”中，有几个字段？
    ds4.show()


    // 将数据封装成Tuple（本质上是case class）
    val ds5: Dataset[(String, String, String)] = ds.map(s=>{
      val arr = s.split(",")
      val tp = (arr(0),arr(1),(arr(2)))  // 将数据封装一个tuple来返回
      tp                                 // tupleEncoder就能根据你的tupleN类型，反射出这里面有几个字段，来生成对应的表结构schema
    })
    ds5.printSchema()


    // 将数据封装成自定义case class
    val ds6: Dataset[Person] = ds.map(s=>{
      val arr = s.split(",")
      val person = Person(arr(0).toInt,arr(1),arr(2).toInt)
      person
    })
    ds6.printSchema()
    /**
      * 解构的方式：case class对象中有几个成员，生成的表结构中就有几个字段； 成员变量名叫什么，字段名就叫什么； 成员变量类型是什么，字段的类型就是什么
      * root
      * |-- id: integer (nullable = false)
      * |-- name: string (nullable = true)
      * |-- age: integer (nullable = false)
      */

    // 将数据封装成自定义的java对象
    val ds7: Dataset[JavaPerson] = ds.map(s=>{
      val arr = s.split(",")
      val person = new JavaPerson(arr(0).toInt,arr(1),arr(2).toInt)
      person
    })(Encoders.bean(classOf[JavaPerson]))   // Java对象，在SQLImplicits中不存在对应的隐式Encoder，只能通过手动传入
    ds7.printSchema()



    // 将数据封装成自定义的普通scala对象
    val ds8: Dataset[ScalaPerson] = ds.map(s=>{
      val arr = s.split(",")
      val person = new ScalaPerson(arr(0).toInt,arr(1),arr(2).toInt)
      person
    })(Encoders.bean(classOf[ScalaPerson]))    //
    ds8.printSchema()


    val rdd9: RDD[ScalaPerson] = ds.rdd.map(s=>{
      val arr = s.split(",")
      val person = new ScalaPerson(arr(0).toInt,arr(1),arr(2).toInt)
      person
    })

    spark.close()

  }

}
